{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Petastorm Hello World Examples\n",
    "\n",
    "In this notebook we will introduce Uber's Petastorm library (https://github.com/uber/petastorm) for creating training datasets for deep learning. We will go over some hello world examples and see how a petstorm store differs from other type of storage formats.\n",
    "\n",
    "### Motivation\n",
    "\n",
    "Petastorm is an open source **data access library**. The main motivation for this library is to make it easier for data scienstists to work with big data stored in Hadoop-like data lakes. The benefits of Petastorm are the following:\n",
    "\n",
    "- It enables to use a single data format that can be used for both Tensorflow and PyTorch datasets (before Petastorm users on Hopsworks would typically have one dataset in tfrecords for training with Tensorflow and another in hdf5 for training with PyTorch). This reduces the number of ETL steps necessary to prepare data for deep learning.\n",
    "\n",
    "- Petastorm datasets integrate very well in Apache Spark, the main processing engine used in Hopsworks. Petastorm datasets are built on top of Parquet, which has better support in Spark than for example TFRecords or HDF5.\n",
    "\n",
    "- A Petastorm dataset is self-contained, the data is stored together with its schema, which means that a data scientist can read a dataset into tensorflow or Pytorch without having to specify the schema to parse the data. As compared to TFRecords, where you need the schema at read-time, and if any discrepancy between your schema and the data on disk you might run into erros where you have to manually inspect protobuf files to figure out the serialization errors. With Petastorm, the API looks a little bit like a database, you do not need to know anything a-priori about the schema of the data you just call `make_reader()` and then the library will use the metadata to infer everything for you. **Note**: When you create a petastorm in the first place and write it to disk you pay the price for the simplicity for *reading* petastorm datasets, i.e when you write Petastorm datasets you have to be very explicit with specifying the schema and other configuration.\n",
    "\n",
    "- The dataset is optimized for filesystems like HDFS, streaming records from large files rather than using billions of small files. (Although HopsFS is a bit special in this regard as it can deal very well with both large and small files (https://www.logicalclocks.com/fixing-the-small-files-problem-in-hdfs/))\n",
    "\n",
    "- Petastorm is supported in Hopsworks Feature Store\n",
    "\n",
    "- When training deep learning models it is important that you can stream data in a way taht does not starve your GPUs, Petastorm is designed to be performant and usable for deep learning from the beginning. Moreover, petastorm have support partitioning data to optimize for distributed deep learning\n",
    "\n",
    "\n",
    "![Petastorm 1](./../images/petastorm1.png \"Petastorm 1\")\n",
    "\n",
    "### Background\n",
    "\n",
    "TLDR; A petastorm dataset is a Parquet dataset with extended metadata.\n",
    "\n",
    "Petastorm is built on top of Parquet files. Parquet is a columnar data format with great support for Spark and other big data applications such as Hive or HDFS. However, Parquet is not supported natively by deep learning frameworks such as Tensorflow or PyTorch. Morover, Parquet has some buit-in primitive data types like integer, string and binary, but it does not include higher-order tensors which is typical in datasets for deep learning. Petastorm solves this by adding a data-access library on top of Parquet and storing extended metadata as a custom-field in the Parquet schema in the footer, which enables to store tensor data in Parquet files.\n",
    "\n",
    "Petastorm stores tensors as binary blobs in Parquet but also stores the necessary schema with information about the name of the tensors, the shape/dimensions, and the data type inside the tensor. Petastorm also provides a set of codecs for specifying how to compress the  data. All of this is specified in a Petastorm schema called a **Unischema**.\n",
    "\n",
    "![Petastorm 2](./../images/petastorm2.png \"Petastorm 2\")\n",
    "\n",
    "To be able to utilize the added metadata to the Parquet we use the Petastorm client library which uses Apache Arrow to manipulat the dataset in memory:\n",
    "\n",
    "```python\n",
    "import petastorm\n",
    "```"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Generating A  Sample Petastorm Dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "from hops import hdfs, featurestore\n",
    "import pyarrow as pa\n",
    "import random\n",
    "import pandas as pd\n",
    "from pyspark.sql import SQLContext, Row\n",
    "from pyspark.sql.types import StructType, StructField, IntegerType\n",
    "\n",
    "# IMPORTANT: must import  tensorflow before petastorm.tf_utils due to a bug in petastorm\n",
    "import tensorflow as tf\n",
    "from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField\n",
    "from petastorm.codecs import ScalarCodec, CompressedImageCodec, NdarrayCodec\n",
    "from pyspark.sql.types import StructType, StructField, IntegerType\n",
    "from petastorm.etl.dataset_metadata import materialize_dataset\n",
    "from petastorm.spark_utils import dataset_as_rdd\n",
    "from petastorm import make_reader\n",
    "from petastorm.tf_utils import tf_tensors, make_petastorm_dataset\n",
    "from petastorm.pytorch import DataLoader\n",
    "from petastorm import make_batch_reader"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Specify the Petastorm Schema\n",
    "\n",
    "Below we specify a sample petastorm schema with three fields, including multi-dimensional tensors which are not supported natively by Parquet. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# The schema defines how the dataset schema looks like\n",
    "HelloWorldSchema = Unischema('HelloWorldSchema', [\n",
    "    UnischemaField('id', np.int32, (), ScalarCodec(IntegerType()), False),\n",
    "    UnischemaField('image1', np.uint8, (128, 256, 3), CompressedImageCodec('png'), False),\n",
    "    UnischemaField('array_4d', np.uint8, (None, 128, 30, None), NdarrayCodec(), False),\n",
    "])"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The Petastorm Unischema is designed so that it can be converted to Spark/Numpy/Tensorflow/Pytorch schemas"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "StructType(List(StructField(array_4d,BinaryType,false),StructField(id,IntegerType,false),StructField(image1,BinaryType,false)))"
     ]
    }
   ],
   "source": [
    "HelloWorldSchema.as_spark_schema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Create a Spark Dataframe that Conforms to the Petastorm Schema and Write the Petastorm Dataset\n",
    "\n",
    "As petastorm datasets are built on top of Parquet files we can use Spark in combination with the context manager `materialize_dataset`. \n",
    "\n",
    "However, first we must create a spark dataframe that conforms to the Petastorm schema. To  do this we can first create a an rdd of dicts and use `dict_to-spark_row(Unischema, dict-row)`. We could also have created the spark dataframe directly from `HelloWorldSchema.as_spark_schema()`. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "OUTPUT_URL = hdfs.project_path() + \"Resources/hello_world\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "def row_generator(x):\n",
    "    \"\"\"Returns a single entry in the generated dataset. Return a bunch of random values as an example.\"\"\"\n",
    "    return {'id': x,\n",
    "            'image1': np.random.randint(0, 255, dtype=np.uint8, size=(128, 256, 3)),\n",
    "            'array_4d': np.random.randint(0, 255, dtype=np.uint8, size=(4, 128, 30, 3))}\n",
    "\n",
    "\n",
    "def generate_petastorm_dataset():\n",
    "    \"\"\" Generates a petastorm dataset and saves it to HopsFS using Spark and materialize_dataset context manager \"\"\"\n",
    "    rowgroup_size_mb = 256\n",
    "    # Wrap dataset materialization portion. Will take care of setting up spark environment variables as\n",
    "    # well as save petastorm specific metadata\n",
    "    rows_count = 10\n",
    "    filesystem_factory= lambda: pa.hdfs.connect(driver='libhdfs')\n",
    "    with materialize_dataset(spark, OUTPUT_URL, HelloWorldSchema, rowgroup_size_mb,filesystem_factory=filesystem_factory):\n",
    "\n",
    "        rows_rdd = sc.parallelize(range(rows_count))\\\n",
    "            .map(row_generator)\\\n",
    "            .map(lambda x: dict_to_spark_row(HelloWorldSchema, x))\n",
    "\n",
    "        spark.createDataFrame(rows_rdd, HelloWorldSchema.as_spark_schema()) \\\n",
    "            .coalesce(10) \\\n",
    "            .write \\\n",
    "            .mode('overwrite') \\\n",
    "            .parquet(OUTPUT_URL)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "By writing the parquet files (`spark.write.parquet()`) using the petastorm context manager `materialize_dataset()`, the petastorm library will take care of writing out the necessary petastorm-specific metadata to the parquet files at the end. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "generate_petastorm_dataset()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Alternative Ways of Creating Spark Data Frames that conforms to a Petastorm Schema\n",
    "\n",
    "You don't have to use the `dict_to_spark_row` and `spark.createDataFrame(rdd, schema.as_spark_schema())`, these are just helper methods to make sure that your Unischema actually is consistent with the spark schema. An alternative way is illustrated below (with less gurantees about consistency)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "sql_context = SQLContext(sc)\n",
    "pandas_df = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))\n",
    "spark_df = sql_context.createDataFrame(pandas_df)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- A: long (nullable = true)\n",
      " |-- B: long (nullable = true)\n",
      " |-- C: long (nullable = true)\n",
      " |-- D: long (nullable = true)"
     ]
    }
   ],
   "source": [
    "spark_df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "TestSchema = Unischema('TestSchema', [\n",
    "    UnischemaField('A', np.int32, (), ScalarCodec(IntegerType()), False),\n",
    "    UnischemaField('B', np.int32, (), ScalarCodec(IntegerType()), False),\n",
    "    UnischemaField('C', np.int32, (), ScalarCodec(IntegerType()), False),\n",
    "    UnischemaField('D', np.int32, (), ScalarCodec(IntegerType()), False)\n",
    "])\n",
    "TEST_URL = hdfs.project_path() + \"Logs/test.petastorm\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "filesystem_factory= lambda: pa.hdfs.connect(driver='libhdfs')\n",
    "with materialize_dataset(spark, TEST_URL, TestSchema, filesystem_factory=filesystem_factory):\n",
    "    spark_df.write.mode('overwrite').parquet(TEST_URL)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Read a Petastorm Dataset\n",
    "\n",
    "A petastorm dataset can be read directly with Spark, Pytorch or Tensorflow using the Petastorm library. This is really where the Petastorm data format shines. Due to being so explicit with schema when writing the dataset, reading the data is very simple."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Reading a Petastorm Dataset using plain python"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "def python_hello_world():\n",
    "    \"\"\" Creates a python reader to read a petastorm dataset\"\"\"\n",
    "    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs') as reader:\n",
    "        # Pure python\n",
    "        for sample in reader:\n",
    "            print(sample.id)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "5\n",
      "6\n",
      "7\n",
      "8\n",
      "9\n",
      "0\n",
      "1\n",
      "2\n",
      "3\n",
      "4"
     ]
    }
   ],
   "source": [
    "python_hello_world()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Reading a Petastorm Dataset using Pyspark\n",
    "\n",
    "To read a petastorm dataset in pyspspark we can either read dataframe directly with spark `spark.read.parquet(OUTPUT_URL)`, however this does not utilize the added metadata that petastorm creates. We can also read a petastorm dataset into pyspark by using Petastorm's utility methods, such as `dataset_as_rdd`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "def pyspark_hello_world():\n",
    "    \"\"\" Reads a petastorm dataset into spark rdd and dataframe\"\"\"\n",
    "    # dataset_as_rdd creates an rdd of named tuples.\n",
    "    rdd = dataset_as_rdd(OUTPUT_URL, spark, [HelloWorldSchema.id, HelloWorldSchema.image1], hdfs_driver=\"libhdfs\")\n",
    "    print('An id in the dataset: ', rdd.first().id)\n",
    "\n",
    "    # Create a dataframe object from a parquet file\n",
    "    dataframe = spark.read.parquet(OUTPUT_URL)\n",
    "\n",
    "    # Show a schema\n",
    "    dataframe.printSchema()\n",
    "\n",
    "    # Count all\n",
    "    dataframe.count()\n",
    "\n",
    "    # Show just some columns\n",
    "    dataframe.select('id').show()\n",
    "\n",
    "    # This is how you can use a standard SQL to query a dataset. Note that the data is not decoded in this case.\n",
    "    number_of_rows = spark.sql(\n",
    "        'SELECT count(id) '\n",
    "        'from parquet.`{}` '.format(OUTPUT_URL)).collect()\n",
    "    print('Number of rows in the dataset: {}'.format(number_of_rows[0][0]))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "An id in the dataset:  0\n",
      "root\n",
      " |-- array_4d: binary (nullable = true)\n",
      " |-- id: integer (nullable = true)\n",
      " |-- image1: binary (nullable = true)\n",
      "\n",
      "+---+\n",
      "| id|\n",
      "+---+\n",
      "|  0|\n",
      "|  1|\n",
      "|  2|\n",
      "|  3|\n",
      "|  4|\n",
      "|  5|\n",
      "|  6|\n",
      "|  7|\n",
      "|  8|\n",
      "|  9|\n",
      "+---+\n",
      "\n",
      "Number of rows in the dataset: 10"
     ]
    }
   ],
   "source": [
    "pyspark_hello_world()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Reading a Petastorm Dataset using Tensorflow\n",
    "\n",
    "Petastorm enables to store multi-dimensional tensors (e.g images) as Parquet and then read it directly in Tensorflow using a very simple API that supports both the old TF-API and the tf.Dataset API. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "def tensorflow_hello_world():\n",
    "    \"\"\" Creates a tensorflow reader for reading a petastorm dataset \"\"\"\n",
    "    # Example: tf_tensors will return tensors with dataset data\n",
    "    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs') as reader:\n",
    "        tensor = tf_tensors(reader)\n",
    "        with tf.Session() as sess:\n",
    "            sample = sess.run(tensor)\n",
    "            print(sample.id)\n",
    "\n",
    "    # Example: use tf.data.Dataset API\n",
    "    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs') as reader:\n",
    "        dataset = make_petastorm_dataset(reader)\n",
    "        iterator = dataset.make_one_shot_iterator()\n",
    "        tensor = iterator.get_next()\n",
    "        with tf.Session() as sess:\n",
    "            sample = sess.run(tensor)\n",
    "            print(sample.id)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "5\n",
      "5"
     ]
    }
   ],
   "source": [
    "tensorflow_hello_world()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Reading a Petastorm Dataset using PyTorch"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [],
   "source": [
    "def pytorch_hello_world():\n",
    "    \"\"\" Creates a PyTorch reader for reading a petastorm dataset \"\"\"\n",
    "    with DataLoader(make_reader(OUTPUT_URL, hdfs_driver=\"libhdfs\")) as train_loader:\n",
    "        sample = next(iter(train_loader))\n",
    "        print(sample['id'])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "tensor([0], dtype=torch.int32)"
     ]
    }
   ],
   "source": [
    "pytorch_hello_world()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### External Datasets in Petastorm\n",
    "\n",
    "`external dataset` in Petastorm library refers to existing Parquet-stores that do not include the extended Petastorm metadata. Petastorm library is able to read such datasets as well by using `make_batch_reader`.\n",
    "\n",
    "`make_batch_reader` works with any parquet stores and returns batches of records, as opposed to `make_reader` that only works with Petastorm datasets and returns one record at a time. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Generate an external Petastorm dataset (regular Parquet store)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 24,
   "metadata": {},
   "outputs": [],
   "source": [
    "OUTPUT_URL2 = hdfs.project_path() + \"Resources/hello_world_external\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [],
   "source": [
    "def row_generator_external(x):\n",
    "    \"\"\"Returns a single entry in the generated dataset. Return a bunch of random values as an example.\"\"\"\n",
    "    return Row(id=x, value1=random.randint(-255, 255), value2=random.randint(-255, 255))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [],
   "source": [
    "def generate_external_dataset():\n",
    "    \"\"\"Creates an example dataset at output_url in Parquet format\"\"\"\n",
    "\n",
    "    rows_count = 10\n",
    "    rows_rdd = sc.parallelize(range(rows_count))\\\n",
    "        .map(row_generator_external)\n",
    "\n",
    "    spark.createDataFrame(rows_rdd).\\\n",
    "        write.\\\n",
    "        mode('overwrite').\\\n",
    "        parquet(OUTPUT_URL2)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [],
   "source": [
    "generate_external_dataset()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read External Dataset using Plain Python and the Petastorm API"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [],
   "source": [
    "def python_hello_world_external():\n",
    "    \"\"\" Creates a python reader for reading a regular parquet store using Petastorm library\"\"\"\n",
    "    # Reading data from the non-Petastorm Parquet via pure Python\n",
    "    with make_batch_reader(OUTPUT_URL2, schema_fields=[\"id\", \"value1\", \"value2\"], hdfs_driver='libhdfs') as reader:\n",
    "        for schema_view in reader:\n",
    "            # make_batch_reader() returns batches of rows instead of individual rows\n",
    "            print(\"Batched read:\\nid: {0} value1: {1} value2: {2}\".format(\n",
    "                schema_view.id, schema_view.value1, schema_view.value2))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Batched read:\n",
      "id: [5 6 7 8 9] value1: [-189   51 -169  244   81] value2: [  72 -175    8  -17 -134]\n",
      "Batched read:\n",
      "id: [0 1 2 3 4] value1: [-189   51 -169  244   81] value2: [  72 -175    8  -17 -134]"
     ]
    }
   ],
   "source": [
    "python_hello_world_external()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read External Dataset using Pytorch and the Petastorm API"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [],
   "source": [
    "def pytorch_hello_world_external():\n",
    "    \"\"\" Creates a pyTorch reader for reading a regular parquet store using Petastorm library\"\"\"\n",
    "    with DataLoader(make_batch_reader(OUTPUT_URL2, hdfs_driver='libhdfs')) as train_loader:\n",
    "        sample = next(iter(train_loader))\n",
    "        # Because we are using make_batch_reader(), each read returns a batch of rows instead of a single row\n",
    "        print(\"id batch: {0}\".format(sample['id']))\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "id batch: tensor([[0, 1, 2, 3, 4]])"
     ]
    }
   ],
   "source": [
    "pytorch_hello_world_external()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read External Dataset using Tensorflow and the Petastorm API"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [],
   "source": [
    "def tensorflow_hello_world_external():\n",
    "    \"\"\" Creates a Tensorflow reader for reading a regular parquet store using Petastorm library\"\"\"\n",
    "    # Example: tf_tensors will return tensors with dataset data\n",
    "    with make_batch_reader(OUTPUT_URL2, hdfs_driver='libhdfs') as reader:\n",
    "        tensor = tf_tensors(reader)\n",
    "        with tf.Session() as sess:\n",
    "            # Because we are using make_batch_reader(), each read returns a batch of rows instead of a single row\n",
    "            batched_sample = sess.run(tensor)\n",
    "            print(\"id batch: {0}\".format(batched_sample.id))\n",
    "    # Example: use tf.data.Dataset API\n",
    "    with make_batch_reader(OUTPUT_URL2, hdfs_driver='libhdfs') as reader:\n",
    "        dataset = make_petastorm_dataset(reader)\n",
    "        iterator = dataset.make_one_shot_iterator()\n",
    "        tensor = iterator.get_next()\n",
    "        with tf.Session() as sess:\n",
    "            batched_sample = sess.run(tensor)\n",
    "            print(\"id batch: {0}\".format(batched_sample.id))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "id batch: [0 1 2 3 4]\n",
      "id batch: [0 1 2 3 4]"
     ]
    }
   ],
   "source": [
    "tensorflow_hello_world_external()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Advanced Features \n",
    "\n",
    "Petastorm also support more advanced features for data access:\n",
    "\n",
    "- Selective column selection, since petastorm is built on a columnat format (Parquet) selective column selection is efficient (as opposed to row-based formats like TF-Records where you must parse the entire row in to memory to do column-selection).\n",
    "\n",
    "- Parallelized reads\n",
    "\n",
    "- Dataset Sharding for distributed training\n",
    "\n",
    "- N-grams (windowing) support. If your data is sorted by time-stamp, Petstorm can efficiently do I/O and autoamtically window it into n-grams for you.  \n",
    "\n",
    "- Row filtering (row predicates)\n",
    "\n",
    "- Shuffling, when doing machine learning shuffling is an important step to avoid that you introduce artifical correlations in the dataset based on ordering"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Selective Column Selection"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [],
   "source": [
    "def tensorflow_selective_column_selection(schema_fields):\n",
    "    \"\"\" Tensorflow Selective Column Selection \"\"\"\n",
    "\n",
    "    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs', schema_fields=schema_fields) as reader:\n",
    "        dataset = make_petastorm_dataset(reader)\n",
    "        iterator = dataset.make_one_shot_iterator()\n",
    "        tensor = iterator.get_next()\n",
    "        with tf.Session() as sess:\n",
    "            sample = sess.run(tensor)\n",
    "            print(\"Number of columns: {}\".format(len(sample)))\n",
    "            print(\"fields:\")\n",
    "            print(sample._fields)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [],
   "source": [
    "OUTPUT_URL = hdfs.project_path() + \"Resources/hello_world\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Number of columns: 3\n",
      "fields:\n",
      "('array_4d', 'id', 'image1')"
     ]
    }
   ],
   "source": [
    "tensorflow_selective_column_selection(None) #read all fields"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Number of columns: 2\n",
      "fields:\n",
      "('array_4d', 'id')"
     ]
    }
   ],
   "source": [
    "tensorflow_selective_column_selection([\"array_4d\", \"id\"]) #read only fields 'array_4d' and 'id"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Parallelized reads"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [],
   "source": [
    "def tensorflow_thread_pool_read(reader_pool_type, workers_count):\n",
    "    \"\"\" Tensorflow Parallel Read with Threads \"\"\"\n",
    "\n",
    "    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs', reader_pool_type = reader_pool_type, \n",
    "                     workers_count=workers_count) as reader:\n",
    "        dataset = make_petastorm_dataset(reader)\n",
    "        iterator = dataset.make_one_shot_iterator()\n",
    "        tensor = iterator.get_next()\n",
    "        with tf.Session() as sess:\n",
    "            sample = sess.run(tensor)\n",
    "            print(sample._fields)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('array_4d', 'id', 'image1')"
     ]
    }
   ],
   "source": [
    "tensorflow_thread_pool_read(\"thread\", 15) # thread pool of size 15 for reading"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 40,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('array_4d', 'id', 'image1')"
     ]
    }
   ],
   "source": [
    "tensorflow_thread_pool_read(\"thread\", 5) # process pool of size 5 for reading"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Dataset Sharding"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 41,
   "metadata": {},
   "outputs": [],
   "source": [
    "def tensorflow_dataset_sharding(shard_count, cur_shard):\n",
    "    \"\"\" \n",
    "    Tensorflow dataset sharding \n",
    "    \n",
    "    Args:\n",
    "        :shard_count: An int denoting the number of shards to break this dataset into. Defaults to None\n",
    "        :cur_shard: An int denoting the current shard number. \n",
    "                    Each node reading a shard should pass in a unique shard number in the range [0, shard_count). \n",
    "                    shard_count must be supplied as well.\n",
    "    \"\"\"\n",
    "\n",
    "    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs', shard_count=shard_count, \n",
    "                     cur_shard=cur_shard) as reader:\n",
    "        dataset = make_petastorm_dataset(reader)\n",
    "        iterator = dataset.make_one_shot_iterator()\n",
    "        tensor = iterator.get_next()\n",
    "        with tf.Session() as sess:\n",
    "            sample = sess.run(tensor)\n",
    "            print(sample._fields)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('array_4d', 'id', 'image1')"
     ]
    }
   ],
   "source": [
    "tensorflow_dataset_sharding(2, 1) # shard dataset into 2 shards and read the first shard"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Row Predicates"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [],
   "source": [
    "def tensorflow_dataset_sharding(predicate):\n",
    "    \"\"\" \n",
    "    Tensorflow reader with row predicate\n",
    "    \"\"\"\n",
    "\n",
    "    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs', predicate=predicate) as reader:\n",
    "        dataset = make_petastorm_dataset(reader)\n",
    "        iterator = dataset.make_one_shot_iterator()\n",
    "        tensor = iterator.get_next()\n",
    "        with tf.Session() as sess:\n",
    "            sample = sess.run(tensor)\n",
    "            print(sample.id)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "5"
     ]
    }
   ],
   "source": [
    "#tensorflow_dataset_sharding(predicate = lambda x: x.id == 5)\n",
    "from petastorm.predicates import in_lambda\n",
    "predicate = in_lambda([\"id\"], lambda id: id==5)\n",
    "tensorflow_dataset_sharding(predicate)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Shuffling "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {},
   "outputs": [],
   "source": [
    "def tensorflow_dataset_shuffling(shuffle_row_groups, shuffle_row_drop_partitions):\n",
    "    \"\"\" \n",
    "    Tensorflow dataset shuffling\n",
    "    \n",
    "    Args:\n",
    "         :shuffle_row_groups: Whether to shuffle row groups (the order in which full row groups are read)\n",
    "         :shuffle_row_drop_partitions: This is is a positive integer which determines how many partitions \n",
    "                                       to break up a row group into for increased shuffling in exchange for \n",
    "                                       worse performance (extra reads). For example if you specify \n",
    "                                       2 each row group read will drop half of the rows within every \n",
    "                                       row group and read the remaining rows in separate reads. \n",
    "                                       It is recommended to keep this number below the regular row group \n",
    "                                       size in order to not waste reads which drop all rows.\n",
    "    \"\"\"\n",
    "\n",
    "    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs', predicate=predicate) as reader:\n",
    "        dataset = make_petastorm_dataset(reader)\n",
    "        iterator = dataset.make_one_shot_iterator()\n",
    "        tensor = iterator.get_next()\n",
    "        with tf.Session() as sess:\n",
    "            sample = sess.run(tensor)\n",
    "            print(sample._fields)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "('array_4d', 'id', 'image1')"
     ]
    }
   ],
   "source": [
    "tensorflow_dataset_shuffling(True, 5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Integration with the Feature Store\n",
    "\n",
    "Petastorm is a supported format for saving training datasets in the feature store. To save a spark dataframe in the Petastorm format in the feature store, use the method `featurestore.create_training_dataset()` and supply the dataframe together with petastorm arguments in a dict `petastorm_args`."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "metadata": {},
   "outputs": [],
   "source": [
    "sql_context = SQLContext(sc)\n",
    "pandas_df = pd.DataFrame(np.random.randint(0,100,size=(100, 4)), columns=list('ABCD'))\n",
    "spark_df = sql_context.createDataFrame(pandas_df)\n",
    "TestSchema = Unischema('TestSchema', [\n",
    "    UnischemaField('A', np.int32, (), ScalarCodec(IntegerType()), False),\n",
    "    UnischemaField('B', np.int32, (), ScalarCodec(IntegerType()), False),\n",
    "    UnischemaField('C', np.int32, (), ScalarCodec(IntegerType()), False),\n",
    "    UnischemaField('D', np.int32, (), ScalarCodec(IntegerType()), False)\n",
    "])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "computing descriptive statistics for : petastorm_hello_world\n",
      "computing feature correlation for: petastorm_hello_world\n",
      "computing feature histograms for: petastorm_hello_world\n",
      "computing cluster analysis for: petastorm_hello_world"
     ]
    }
   ],
   "source": [
    "petastorm_args = {\"schema\": TestSchema}\n",
    "featurestore.create_training_dataset(spark_df, \"petastorm_hello_world\", data_format=\"petastorm\",\n",
    "                                     petastorm_args=petastorm_args)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 49,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- A: long (nullable = true)\n",
      " |-- B: long (nullable = true)\n",
      " |-- C: long (nullable = true)\n",
      " |-- D: long (nullable = true)"
     ]
    }
   ],
   "source": [
    "df = featurestore.get_training_dataset(\"petastorm_hello_world\")\n",
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}